-
Notifications
You must be signed in to change notification settings - Fork 10.8k
Add unified jobs API with /api/jobs endpoints #11054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
ae57e94 to
c4b7a84
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likely out of scope and should be done in separate pass, but can we break up the routes in this file to separate files. Over 1000 lines is bananas
execution.py
Outdated
| def _normalize_queue_item(self, item, status): | ||
| """Convert queue item tuple to unified job dict.""" | ||
| _, prompt_id, _, extra_data, _ = item[:5] | ||
| create_time = extra_data.get('create_time') | ||
|
|
||
| return { | ||
| 'id': prompt_id, | ||
| 'status': status, | ||
| 'create_time': create_time, | ||
| 'execution_time': None, | ||
| 'error_message': None, | ||
| 'outputs_count': 0, | ||
| 'preview_output': None, | ||
| 'workflow_id': None, | ||
| } | ||
|
|
||
| def _normalize_history_item(self, prompt_id, history_item, include_outputs=False): | ||
| """Convert history item dict to unified job dict.""" | ||
| prompt_tuple = history_item['prompt'] | ||
| _, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] | ||
| create_time = extra_data.get('create_time') | ||
|
|
||
| # Determine status from history status | ||
| status_info = history_item.get('status', {}) | ||
| if status_info: | ||
| status = 'completed' if status_info.get('status_str') == 'success' else 'error' | ||
| else: | ||
| status = 'completed' | ||
|
|
||
| outputs = history_item.get('outputs', {}) | ||
|
|
||
| outputs_count, preview_output = self._get_outputs_summary(outputs) | ||
|
|
||
| error_message = None | ||
| if status == 'error' and status_info: | ||
| messages = status_info.get('messages', []) | ||
| if messages: | ||
| error_message = messages[0] if isinstance(messages[0], str) else str(messages[0]) | ||
|
|
||
| execution_time = history_item.get('execution_time') | ||
|
|
||
| job = { | ||
| 'id': prompt_id, | ||
| 'status': status, | ||
| 'create_time': create_time, | ||
| 'execution_time': execution_time, | ||
| 'error_message': error_message, | ||
| 'outputs_count': outputs_count, | ||
| 'preview_output': preview_output, | ||
| 'workflow_id': None, | ||
| } | ||
|
|
||
| if include_outputs: | ||
| job['outputs'] = outputs | ||
| job['prompt'] = prompt | ||
| job['extra_data'] = extra_data | ||
| job['outputs_to_execute'] = outputs_to_execute | ||
|
|
||
| return job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a lot of these private methods appear to be static methods. To keep this class thinner, can we break out the static methods to a separate file, and move outside the class to be regular functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar comment to before -- likely out of scope for this, but can we start introducing some structure to this repo and start breaking up this superclass to separate files for maintainability and extensibility
execution.py
Outdated
| jobs = [] | ||
|
|
||
| if status_filter is None: | ||
| status_filter = ['pending', 'in_progress', 'completed', 'error'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these statuses defined elsewhere in this file? Worth breaking out to an enum to ensure consistency within this file?
execution.py
Outdated
| if (is_error and include_error) or (not is_error and include_completed): | ||
| jobs.append(self._normalize_history_item(prompt_id, history_item)) | ||
|
|
||
| jobs = self._apply_sorting(jobs, sort_by, sort_order) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: apply sorting after limiting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we gotta sort first or we'll limit the wrong entries :(
execution.py
Outdated
| media_type == 'video' or | ||
| media_type == 'audio' or | ||
| filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')) or # images | ||
| filename.endswith(('.mp4', '.webm', '.mov', '.avi')) or # video |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bad way to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can refactor this a bit, but the frontend counts by checking file extensions and skipping animated
57fd19f to
c860cc6
Compare
|
Rebasing |
| IN_PROGRESS = 'in_progress' | ||
| COMPLETED = 'completed' | ||
| FAILED = 'failed' | ||
| CANCELLED = 'cancelled' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the cancelled status exist?
The /api/jobs endpoint will always return an empty list for this status, and normalize_history_item will treat this status as JobStatus.COMPLETED.
| status_param = query.get("status", None) | ||
| status_filter = None | ||
| if status_param: | ||
| status_filter = [s.strip() for s in status_param.split(',') if s.strip()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the caller passes status=foo, then:
- status_filter becomes ["foo"].
- Filtered against
valid_statuses->[] - status_filter becomes
Noneand call returns full results with200 OK
That will be pretty surprising for the caller
| status=400 | ||
| ) | ||
|
|
||
| sort_order = query.get('sort_order', 'desc') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to add lowercase conversion here?
|
Can you test this properly? The endpoint doesn't work after I run a regular workflow. |
Summary
GET /api/jobsendpoint with filtering, sorting, and paginationpending,in_progress,completed,errorcreate_timeorexecution_timelimit/offsetGET /api/jobs/{job_id}for single job details with full outputsexecution_timein history for completed jobsoutputs_countandpreview_outputin job responsestype=output, supports images/video/audio/3D)Motivation
Currently
/queueand/historyare separate endpoints with different response shapes. This unified API provides:outputs_count,preview_output) for list views without fetching full outputsTest plan
pytest tests/execution/test_execution.py -v/api/jobsreturns jobs from queue and history on https://www.github.com/Comfy-Org/ComfyUI_frontend/commit/a57ea57a09a274efdc8444b949452509d7cf49de/api/jobs/{id}returns full job with outputsexecution_timeis tracked for completed jobs